package com.zlm.common;

/**
 * Author: Harbour
 * Date: 2021-05-17 9:05
 * Desc: 自定义数据源，模拟生成市场用户行为数据
 */

import com.zlm.bean.MarketingUserBehavior;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Arrays;
import java.util.List;
import java.util.Random;

public class SimulateMarketingBehaviorSourceFunction implements SourceFunction<MarketingUserBehavior> {

    // 是否运行的标识位
    Boolean running = true;

    // 定义用户行为和渠道的集合
    List<String> behaviorList = Arrays.asList("CLICK", "DOWNLOAD", "INSTALL", "UNINSTALL");
    List<String> channelList = Arrays.asList("app store", "weibo", "wechat", "baidu");

    Random random = new Random();

    @Override
    public void run(SourceContext<MarketingUserBehavior> ctx) throws Exception {
        while (running) {
            Long id = random.nextLong();
            String behavior = behaviorList.get(random.nextInt(behaviorList.size()));
            String channel = channelList.get(random.nextInt(channelList.size()));
            Long timestamp = System.currentTimeMillis();
            ctx.collect(new MarketingUserBehavior(id, behavior, channel, timestamp));
//                Thread.sleep(50L);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}
